In [1]:
import pandas as pd
from pyspark.sql import *
from plotly.graph_objs import *
from plotly.offline import download_plotlyjs, init_notebook_mode, iplot
init_notebook_mode()
import plotly.graph_objs as go
import datetime
import numpy as np

import warnings
warnings.filterwarnings('ignore')
In [2]:
import os
import sys
module_path = os.path.abspath(os.path.join('../../instacart-ml'))
if module_path not in sys.path:
    sys.path.append(module_path)


import common_utility.ModelEvaluation as me
import common_utility.PlotlyObject as plt
In [3]:
from glob import glob
import pyspark.sql.functions as F

def read_spark_csv(spark, path):
    df = spark.read.option("header", "true").option("inferSchema", "true").csv(path)
    return df

spark = SparkSession.builder \
  .appName("My Spark Application")\
  .config("spark.master", "local[*]")\
  .config("spark.driver.memory", "10g")\
  .config("spark.executor.memory", "30g")\
  .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
logger = spark._jvm.org.apache.log4j
logger.LogManager.getRootLogger().setLevel(logger.Level.ERROR)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/17 21:11:40 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
In [4]:
data_list = glob("/Users/karenwang/PycharmProjects/instacart-ml/instacart-market-basket-analysis/data/*")
product = read_spark_csv(spark, data_list[0])
order = read_spark_csv(spark, data_list[1])
order_products_train = read_spark_csv(spark, data_list[2])
departments = read_spark_csv(spark, data_list[3])
aisles = read_spark_csv(spark, data_list[4])
order_products_prior = read_spark_csv(spark, data_list[5])
                                                                                
In [ ]:
#### Cohort 1 ####
# for every user, collect all previous purchase product_id as # of row in training set
order_schema = ['order_id', 'user_id', 'eval_set', 'order_dow', 'order_hour_of_day', 'days_since_prior_order']
train_user = order.filter(F.col("eval_set") == 'train').select("user_id").distinct()
order_product = order_products_prior\
        .unionByName(order_products_train)\
        .join(order.select(*order_schema), ['order_id'], 'inner')\
        .join(train_user, ['user_id'], 'inner')
cohort_df = order_product.filter(F.col("eval_set") == "prior").groupBy('user_id')\
            .agg(F.collect_set("product_id").alias("product_id"))\
            .withColumn("product_id", F.explode("product_id"))
cohort_schema = ["order_id", "product_id", 'user_id']
feature_schema = ['user_id', 'order_id', 'order_dow', 'order_hour_of_day', 'days_since_prior_order']

train_df = order_product.filter(F.col("eval_set") == 'train').select(*cohort_schema)\
            .join(cohort_df, ['user_id', 'product_id'], 'right')\
            .withColumn("reordered", F.when(F.col("order_id").isNotNull(), 1).otherwise(0)).drop("order_id")
output_df = order_product.filter(F.col("eval_set") == 'train') \
            .dropDuplicates(["user_id"]).select(*feature_schema) \
            .join(train_df, ['user_id'], 'inner')\
            .join(product.select("product_id", 'department_id'), ['product_id'], 'inner')

output_path = "/Users/karenwang/PycharmProjects/instacart-ml/instacart-market-basket-analysis/parquet/train_df.parquet"
output_df.write.mode("overwrite").parquet(output_path)

# Product level -> Order Level; User_id, Product_id, Reordered
In [7]:
df = pd.read_parquet(output_path)
df.columns
Out[7]:
Index(['product_id', 'user_id', 'order_id', 'order_dow', 'order_hour_of_day',
       'days_since_prior_order', 'reordered', 'department_id'],
      dtype='object')
In [ ]:
#### Cohort 2 ####
df = spark.read.parquet(output_path)

user_order = order.filter(F.col("eval_set") == 'prior')\
            .groupBy(['user_id']).agg(F.count_distinct("order_id").alias("user_order_num"))

# aisle id & reorder rate per product for each user
reorder_freq = order_products_prior\
    .join(order.select("user_id", "order_id"), ["order_id"], "left")\
    .groupBy(['user_id', "product_id"]).agg(F.count("order_id").alias("num_order"))\
    .join(user_order, ["user_id"], "left")\
    .withColumn("reorder_rate", F.round(F.col("num_order") / F.col("user_order_num"),2))\
    .join(product.select("product_id", "aisle_id"), ["product_id"], "inner")

temp = order.groupBy("user_id").agg(F.mean("days_since_prior_order").alias("mean_day"), 
                             F.stddev("days_since_prior_order").alias("std_day"))

# sacle since priror order for each product (based on each user)
scale_day_prior = order\
    .filter(F.col("eval_set") == "train")\
    .join(temp, ["user_id"], "inner")\
    .withColumn("scale_day_prior", (F.col("days_since_prior_order") - F.col("mean_day")) / F.col("std_day"))\
    .select("user_id", "scale_day_prior")

output_df2 = df.join(reorder_freq.select("product_id", "user_id", "reorder_rate", "aisle_id"), ["user_id", "product_id"], "left")\
    .join(scale_day_prior, ['user_id'], "left")\
    .drop("days_since_prior_order")

output_path2 = "/Users/karenwang/PycharmProjects/instacart-ml/instacart-market-basket-analysis/parquet/train_df2.parquet"
output_df2.write.mode("overwrite").parquet(output_path2)
In [8]:
df2 = pd.read_parquet(output_path2)
df2.columns
Out[8]:
Index(['user_id', 'product_id', 'order_id', 'order_dow', 'order_hour_of_day',
       'reordered', 'department_id', 'reorder_rate', 'aisle_id',
       'scale_day_prior'],
      dtype='object')
In [18]:
# Feature Engineering

from sklearn.preprocessing import OneHotEncoder

encoder = OneHotEncoder(sparse=False)
encoder_list = ['order_dow', 'order_hour_of_day', 'department_id']

encoder.fit(df[encoder_list])
encoded_data = encoder.transform(df[encoder_list])

columns = encoder.get_feature_names_out(encoder_list)

one_hot_encoded_df = pd.DataFrame(encoded_data, columns=columns)
df = pd.concat([df, one_hot_encoded_df], axis=1)
In [19]:
# train_test_split
test_ratio = 0.2
n_id = df['user_id'].nunique()
test_id = df['user_id'].drop_duplicates().sample(int(n_id *test_ratio)).tolist()
train_df = df[~df['user_id'].isin(test_id)].reset_index(drop=True)
test_df = df[df['user_id'].isin(test_id)].reset_index(drop=True)
positive_rate = train_df[train_df['reordered'] == 0].shape[0] / train_df[
    train_df['reordered'] == 1].shape[0] # ratio for imbalance data 

input_var_list = columns.tolist() + ['days_since_prior_order']
label = 'reordered'

train_x = train_df[input_var_list]
test_x = test_df[input_var_list]
train_y = train_df[label]
test_y = test_df[label]
In [20]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

logreg = LogisticRegression(class_weight='balanced', max_iter=100, n_jobs=-1, verbose=0)
logreg.fit(train_x, train_y)

test_df['log_prob'] = logreg.predict_proba(test_x)[:,1]
test_df['log_pred'] = test_df['log_prob'].apply(lambda x: 1 if x > 0.5 else 0)

train_df['log_prob'] = logreg.predict_proba(train_x)[:,1]
train_df['log_pred'] = train_df['log_prob'].apply(lambda x: 1 if x > 0.5 else 0)
/Users/karenwang/anaconda3/envs/mcgill_ml/lib/python3.11/site-packages/sklearn/linear_model/_logistic.py:458: ConvergenceWarning: lbfgs failed to converge (status=1):
STOP: TOTAL NO. of ITERATIONS REACHED LIMIT.

Increase the number of iterations (max_iter) or scale the data as shown in:
    https://scikit-learn.org/stable/modules/preprocessing.html
Please also refer to the documentation for alternative solver options:
    https://scikit-learn.org/stable/modules/linear_model.html#logistic-regression
  n_iter_i = _check_optimize_result(
In [21]:
import xgboost as xgb
clf = xgb.XGBClassifier(n_estimators=300, max_depth=6, n_jobs=-1, scale_pos_weight=positive_rate)
clf.fit(train_x, train_y)

# Predict the test set results
test_df['xgb_prob'] = clf.predict_proba(test_x)[:,1]
test_df['xgb_pred'] = test_df['xgb_prob'].apply(lambda x: 1 if x > 0.5 else 0)

train_df['xgb_prob'] = clf.predict_proba(train_x)[:,1]
train_df['xgb_pred'] = train_df['xgb_prob'].apply(lambda x: 1 if x > 0.5 else 0)
In [22]:
trace1 = me.create_roc_trace(train_df, label, 'log_prob', 'train_logistic')
trace2 = me.create_roc_trace(test_df, label, 'log_prob', 'test_logistic')
trace3 = me.create_roc_trace(train_df, label, 'xgb_prob', 'train_xgb')
trace4 = me.create_roc_trace(test_df, label, 'xgb_prob', 'test_xgb')
data = [trace1, trace2, trace3, trace4]

me.create_overlay_roc_curve(data)
In [23]:
t1 = me.ClassifierModelEvaluation(train_df, 'logistic_train', label=label, 
                             pred_col='log_pred', prob_col='log_prob').model_summary("logistic_train")
t2 = me.ClassifierModelEvaluation(test_df, 'logistic_test', label=label, 
                             pred_col='log_pred', prob_col='log_prob').model_summary("logistic_test")
t3 = me.ClassifierModelEvaluation(train_df, 'xgb_train', label=label, 
                             pred_col='log_pred', prob_col='xgb_prob').model_summary("xgb_train")
t4 = me.ClassifierModelEvaluation(test_df, 'xgb_test', label=label, 
                             pred_col='log_pred', prob_col='xgb_prob').model_summary("xgb_test")
pd.concat([t1, t2, t3, t4], axis=0).round(3)
Out[23]:
sample size sensitivity specificity f1_score accuracy precision auc model_name
0 6759511 0.710 0.459 0.212 0.484 0.125 0.616 logistic_train
0 1715150 0.706 0.461 0.212 0.485 0.125 0.615 logistic_test
0 6759511 0.710 0.459 0.212 0.484 0.125 0.639 xgb_train
0 1715150 0.706 0.461 0.212 0.485 0.125 0.621 xgb_test
In [24]:
threshold_list = [i/100 for i in range(40, 65)]
b1 = me.create_model_evaluation_by_threshold(train_df, threshold_list, 'logistic_train', label, 'log_prob')
b2 = me.create_model_evaluation_by_threshold(test_df, threshold_list, 'logistic_test', label, 'log_prob')
b3 = me.create_model_evaluation_by_threshold(train_df, threshold_list, 'xgb_train', label, 'xgb_prob')
b4 = me.create_model_evaluation_by_threshold(test_df, threshold_list, 'xgb_test', label, 'xgb_prob')
In [25]:
data = [plt.create_table_trace(i.round(3).drop('model_name', axis=1)) for i in [b1, b2, b3, b4]]
data[0].visible = True

var_list = ['logistic_train', 'logistic_test', 'xgb_train', 'xgb_test']
buttons = []
visible_list = plt.visible_true_false_list(len(var_list), 1)
for i in range(len(visible_list)):
    temp = {'label': var_list[i],'method': 'update', 'args': [{'visible': visible_list[i]}]}
    buttons.append(temp)

updatemenus = list([
            dict(active=-1,
                 x=0.0,
                 xanchor='left',
                 y=1.33,
                 yanchor='top',
                 direction='down',
                 buttons=buttons,
                 )
        ])

layout = go.Layout(title='<b>Model Performance - Threshold Table<b>',
                   updatemenus = updatemenus,
                       height=600,
                       width=900)
fig = go.Figure(data=data, layout=layout)
fig.show()

Next Steps:¶

  • Feature Engineering include adding new predictors.
  • Used hyperparameter tuning to improve the performance - Tried XGBoost: n_estimator to 500 and 1000, max_depth = 7, 8. But they did not improve the model much. We will try if feature engineering helps improve the performance.

Featuring Engineering¶

Consider adding the below feature and evaluate the model performance

  • aisle id
  • reorder rate for each user per product for each user
  • sacle since priror order for each product (based on each user)
In [9]:
df2.head()
Out[9]:
user_id product_id order_id order_dow order_hour_of_day reordered department_id reorder_rate aisle_id scale_day_prior
0 34 5134 698604 4 13 0 19 0.2 117 1.020123
1 34 16349 698604 4 13 0 4 0.2 83 1.020123
2 34 21783 698604 4 13 0 1 0.2 37 1.020123
3 34 4086 698604 4 13 0 4 0.2 16 1.020123
4 34 3957 698604 4 13 0 7 0.6 31 1.020123
In [10]:
# Feature Engineering

from sklearn.preprocessing import OneHotEncoder

encoder = OneHotEncoder(sparse=False)
encoder_list = ['order_dow', 'order_hour_of_day', 'department_id', 'aisle_id']

encoder.fit(df2[encoder_list])
encoded_data = encoder.transform(df2[encoder_list])

columns = encoder.get_feature_names_out(encoder_list)

one_hot_encoded_df2 = pd.DataFrame(encoded_data, columns=columns)
df2 = pd.concat([df2, one_hot_encoded_df2], axis=1)
In [11]:
# train_test_split
test_ratio = 0.2
n_id = df2['user_id'].nunique()
test_id = df2['user_id'].drop_duplicates().sample(int(n_id *test_ratio)).tolist()
train_df = df2[~df2['user_id'].isin(test_id)].reset_index(drop=True)
test_df = df2[df2['user_id'].isin(test_id)].reset_index(drop=True)
positive_rate = train_df[train_df['reordered'] == 0].shape[0] / train_df[
    train_df['reordered'] == 1].shape[0] # ratio for imbalance data 

input_var_list = columns.tolist() #+ ['days_since_prior_order']
label = 'reordered'

train_x = train_df[input_var_list]
test_x = test_df[input_var_list]
train_y = train_df[label]
test_y = test_df[label]
In [12]:
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

logreg = LogisticRegression(class_weight='balanced', max_iter=250, n_jobs=-1, verbose=0)
logreg.fit(train_x, train_y)

test_df['log_prob'] = logreg.predict_proba(test_x)[:,1]
test_df['log_pred'] = test_df['log_prob'].apply(lambda x: 1 if x > 0.5 else 0)

train_df['log_prob'] = logreg.predict_proba(train_x)[:,1]
train_df['log_pred'] = train_df['log_prob'].apply(lambda x: 1 if x > 0.5 else 0)
In [13]:
import xgboost as xgb
clf = xgb.XGBClassifier(n_estimators=300, max_depth=6, n_jobs=-1, scale_pos_weight=positive_rate)
clf.fit(train_x, train_y)

# Predict the test set results
test_df['xgb_prob'] = clf.predict_proba(test_x)[:,1]
test_df['xgb_pred'] = test_df['xgb_prob'].apply(lambda x: 1 if x > 0.5 else 0)

train_df['xgb_prob'] = clf.predict_proba(train_x)[:,1]
train_df['xgb_pred'] = train_df['xgb_prob'].apply(lambda x: 1 if x > 0.5 else 0)
In [14]:
trace1 = me.create_roc_trace(train_df, label, 'log_prob', 'train_logistic')
trace2 = me.create_roc_trace(test_df, label, 'log_prob', 'test_logistic')
trace3 = me.create_roc_trace(train_df, label, 'xgb_prob', 'train_xgb')
trace4 = me.create_roc_trace(test_df, label, 'xgb_prob', 'test_xgb')
data = [trace1, trace2, trace3, trace4]

me.create_overlay_roc_curve(data)
In [15]:
t1 = me.ClassifierModelEvaluation(train_df, 'logistic_train', label=label, 
                             pred_col='log_pred', prob_col='log_prob').model_summary("logistic_train")
t2 = me.ClassifierModelEvaluation(test_df, 'logistic_test', label=label, 
                             pred_col='log_pred', prob_col='log_prob').model_summary("logistic_test")
t3 = me.ClassifierModelEvaluation(train_df, 'xgb_train', label=label, 
                             pred_col='log_pred', prob_col='xgb_prob').model_summary("xgb_train")
t4 = me.ClassifierModelEvaluation(test_df, 'xgb_test', label=label, 
                             pred_col='log_pred', prob_col='xgb_prob').model_summary("xgb_test")
pd.concat([t1, t2, t3, t4], axis=0).round(3)
Out[15]:
sample size sensitivity specificity f1_score accuracy precision auc model_name
0 6766768 0.641 0.543 0.219 0.552 0.132 0.634 logistic_train
0 1707893 0.638 0.544 0.216 0.553 0.130 0.633 logistic_test
0 6766768 0.641 0.543 0.219 0.552 0.132 0.639 xgb_train
0 1707893 0.638 0.544 0.216 0.553 0.130 0.632 xgb_test
In [16]:
threshold_list = [i/100 for i in range(40, 65)]
b1 = me.create_model_evaluation_by_threshold(train_df, threshold_list, 'logistic_train', label, 'log_prob')
b2 = me.create_model_evaluation_by_threshold(test_df, threshold_list, 'logistic_test', label, 'log_prob')
b3 = me.create_model_evaluation_by_threshold(train_df, threshold_list, 'xgb_train', label, 'xgb_prob')
b4 = me.create_model_evaluation_by_threshold(test_df, threshold_list, 'xgb_test', label, 'xgb_prob')
In [17]:
data = [plt.create_table_trace(i.round(3).drop('model_name', axis=1)) for i in [b1, b2, b3, b4]]
data[0].visible = True

var_list = ['logistic_train', 'logistic_test', 'xgb_train', 'xgb_test']
buttons = []
visible_list = plt.visible_true_false_list(len(var_list), 1)
for i in range(len(visible_list)):
    temp = {'label': var_list[i],'method': 'update', 'args': [{'visible': visible_list[i]}]}
    buttons.append(temp)

updatemenus = list([
            dict(active=-1,
                 x=0.0,
                 xanchor='left',
                 y=1.33,
                 yanchor='top',
                 direction='down',
                 buttons=buttons,
                 )
        ])

layout = go.Layout(title='<b>Model Performance - Threshold Table<b>',
                   updatemenus = updatemenus,
                       height=600,
                       width=900)
fig = go.Figure(data=data, layout=layout)
fig.show()